搭建单节点Hadoop, 加载股票数据到HDFS
使用的环境为Ubuntu 16 64/位系统。
环境搭建
首先参照Hadoop的官方文档搭建单节点的系统,下载压缩包之后解压, 配置JDK即可运行。这里使用在单机运行的伪集群模式,如下修改configuration文件, NameNode为localhost:9000。
etc/hadoop/core-site.xml
1 2 3 4 5 6 7 8 9 10
| <configuration> <property> <name>fs.defaultFS</name> <value>hdfs://localhost:9005</value> </property> <property> <name>hadoop.tmp.dir</name> <value>/home/vagrant/download/hadoop-2.7.2/data</value> </property> </configuration>
|
etc/hadoop/hdfs-site.xml
1 2 3 4 5 6
| <configuration> <property> <name>dfs.replication</name> <value>1</value> </property> </configuration>
|
在Bash 下运行命令:
1 2 3
| $ bin/hdfs namenode -format $ sbin/start-dfs.sh $ sbin/stop-dfs.sh
|
上传股票数据到HDFS
股票数据来源为网易财经, 运行一个python script将所有股票的最新价格保存为一个文件, 文件名以日期命名, 形如2016-08-21.csv
。
python script: save_price_daily_csv.py
使用Hadoop提供的工具将生成的csv保存到HDFS:
1 2
| $ bin/hdfs dfs -mkdir -p /user/vagrant/stock-daily-price $ bin/hdfs dfs -put /vagrant/tmp/2016-08-21.csv stock-daily-price/
|
统计涨跌数的MapReduce程序
参考Wordcount的代码, 依葫芦画瓢, 写出下面一段代码, 添加hadoop相关的jar包,编译打包成HapTet-1.0-SNAPSHOT.jar
. 代码部分只需要重写map()
函数即可。
1 2
| $ bin/hadoop jar /vagrant/tmp/HapTet-1.0-SNAPSHOT.jar com.viifly.hadoop.ta1.StockCount stock-daily-price/2016-08-21.csv out/ $ bin/hdfs dfs -cat ./out/*
|
java代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
| package com.viifly.hadoop.ta1;
import java.io.IOException; import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class StockCount { public static final String UP3 = "UP3"; public static final String UP0 = "UP0"; public static final String DW0 = "DW0"; public static final String DW3 = "DW3";
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1); private Text word = new Text();
public void map(Object key, Text value, Context context ) throws IOException, InterruptedException {
String line = value.toString(); String[] fields = line.split(","); if (fields.length > 5 && (fields[0].charAt(0) == '0' || fields[0].charAt(0) == '6' )) { Float percent = Float.parseFloat(fields[3]) * 100;
String t = null; if (percent >= 3) { t = UP3; } else if (percent >= 0) { t = UP0; } else if (percent >= -3) { t = DW0; } else { t = DW3; } word.set(t); context.write(word, one); } } }
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "Stock Count"); job.setJarByClass(StockCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
|