`
lancefox
  • 浏览: 63269 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

HADOOP 处理 XML 样例

阅读更多
前几天去亿阳信通面试,被一个很胖的兄弟问了一个问题,不知道咋处理,回来特意研究了一下,希望能为其他兄弟提供帮助。

问题是,HADOOP如何来处理结构化数据,比如大量的XML

答案如下(非常遗憾,貌似只能在旧版本API上使用,即 org.apache.hadoop.mapred):

package com.liangc.hadoop.mr;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.MultipleInputs;
import org.apache.hadoop.streaming.StreamInputFormat;
import org.apache.hadoop.streaming.StreamXmlRecordReader;

public class XmlMR {
	public static class MyMap extends MapReduceBase implements Mapper<Text, Text, Text, Text> {
		@Override
		public void map(Text key, Text value, OutputCollector<Text, Text> ctx, Reporter r) throws IOException {
			System.out.println("map :::::: "+key.toString());
			ctx.collect(key, key);
		}
	}

	public static class MyReduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
		@Override
		public void reduce(Text key, Iterator<Text> value, OutputCollector<Text, Text> ctx, Reporter r) throws IOException {
			StringBuffer sb = new StringBuffer();
			while(value.hasNext()){
				Text v = value.next();
				System.out.println("reduce :::::: "+v.toString());
				sb.append(v.toString());
			}
			ctx.collect(new Text(key.getLength()+""), new Text(sb.toString()));
		}
	}

	public static void main(String[] args) throws Exception {
		String input = args[0];
		String output = args[1];
		String begin = "<property>";
		String end = "</property>";
		
		JobConf conf = new JobConf(XmlMR.class);
		conf.setJobName("xmlMR");
		
		conf.setOutputKeyClass(Text.class);
		conf.setOutputValueClass(Text.class);

		conf.setMapperClass(MyMap.class);
		conf.setReducerClass(MyReduce.class);

		conf.setInputFormat(StreamInputFormat.class);
		conf.setOutputFormat(TextOutputFormat.class);
		
		MultipleInputs.addInputPath(conf, new Path(input), StreamInputFormat.class,MyMap.class);
		FileOutputFormat.setOutputPath(conf, new Path(output));

		conf.set("stream.recordreader.class", StreamXmlRecordReader.class.getName());
		conf.set("stream.recordreader.begin", begin );
		conf.set("stream.recordreader.end", end );
		
		JobClient.runJob(conf);
	}

}


我把hadoop的conf目录下的XML上传到HDFS当做测试数据,执行脚本如下(路径替换成自己的):
hadoop jar xmlMR.jar com.liangc.hadoop.mr.XmlMR /user/hadoop/conf/* /user/hadoop/conf/out/13032901

执行结果:
......
300 <property>
    <name>mapred.capacity-scheduler.default-init-accept-jobs-factor</name>
    <value>10</value>
    <description>The default multipe of (maximum-system-jobs * queue-capacity)
    used to determine the number of jobs which are accepted by the scheduler. 
    </description>
  </property>
355 <property>
    <name>mapred.capacity-scheduler.default-maximum-active-tasks-per-queue</name>
    <value>200000</value>
    <description>The default maximum number of tasks, across all jobs in the
    queue, which can be initialized concurrently. Once the queue's jobs exceed
    this limit they will be queued on disk. 
    </description>
  </property>
......

能得到这样的结果我很兴奋,因为只要能得到这样的结果,我在 Mapreduce 中就可以对数据进行各种处理,进而得到想要的各种结果了。
分享到:
评论
1 楼 developerinit 2013-08-14  
楼主帖子不错

相关推荐

Global site tag (gtag.js) - Google Analytics