Reduce Side Join (Secondary Sorting) program code.
Custom Key
public class EmployeeKey implements WritableComparable<EmployeeKey>{
Text StateID;
IntWritable type;
public Text getStateID() {
return StateID;
}
public void setStateID(Text stateID) {
StateID = stateID;
}
public IntWritable getType() {
return type;
}
public void setType(IntWritable type) {
this.type = type;
}
public EmployeeKey() {
// TODO Auto-generated constructor stub
StateID = new Text();
type =new IntWritable();
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
this.StateID.readFields(in);
this.type.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
this.StateID.write(out);
this.type.write(out);
}
@Override
public int compareTo(EmployeeKey o) {
// TODO Auto-generated method stub
int cmp = 0;
cmp = this.StateID.compareTo(o.getStateID());
if(cmp == 0){
cmp = this.type.compareTo(o.getType());
}
return cmp;
}
}
public class EmployeeKey implements WritableComparable<EmployeeKey>{
Text StateID;
IntWritable type;
public Text getStateID() {
return StateID;
}
public void setStateID(Text stateID) {
StateID = stateID;
}
public IntWritable getType() {
return type;
}
public void setType(IntWritable type) {
this.type = type;
}
public EmployeeKey() {
// TODO Auto-generated constructor stub
StateID = new Text();
type =new IntWritable();
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
this.StateID.readFields(in);
this.type.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
this.StateID.write(out);
this.type.write(out);
}
@Override
public int compareTo(EmployeeKey o) {
// TODO Auto-generated method stub
int cmp = 0;
cmp = this.StateID.compareTo(o.getStateID());
if(cmp == 0){
cmp = this.type.compareTo(o.getType());
}
return cmp;
}
}
Custom Value
public class EmployeeValue implements Writable{
Text Fname;
Text Lname;
Text state;
public Text getFname() {
return Fname;
}
public void setFname(Text fname) {
Fname = fname;
}
public Text getLname() {
return Lname;
}
public void setLname(Text lname) {
Lname = lname;
}
public Text getState() {
return state;
}
public void setState(Text state) {
this.state = state;
}
public EmployeeValue() {
// TODO Auto-generated constructor stub
Fname = new Text();
Lname = new Text();
state = new Text();
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
this.getFname().readFields(in);
this.getLname().readFields(in);
this.getState().readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
this.getFname().write(out);
this.getLname().write(out);
this.getState().write(out);
}
}
Mapper
public class maper extends Mapper<LongWritable, Text, EmployeeKey, EmployeeValue>{
int type = 0;
EmployeeKey ekey = new EmployeeKey();
EmployeeValue evalue = new EmployeeValue();
@Override
protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
Path fileName = ((FileSplit)context.getInputSplit()).getPath();
String file = fileName.getName();
if (file.equalsIgnoreCase("emp_data")){
type = 1;
}else{
type= 0;
}
}
@Override
protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
String [] val = value.toString().trim().split(",");
if(type == 0){
ekey.setStateID(new Text(val[0]));
ekey.setType(new IntWritable(type));
evalue.setFname(new Text(""));
evalue.setLname(new Text(""));
evalue.setState(new Text(val[1]));
}else{
ekey.setStateID(new Text(val[6]));
ekey.setType(new IntWritable(type));
evalue.setFname(new Text(val[0]));
evalue.setLname(new Text(val[1]));
evalue.setState(new Text(""));
}
context.write(ekey, evalue);
}
}
Grouping Comparator
public class Groupingcmp extends WritableComparator{
protected SecSortGroupingcmp() {
super(EmployeeKey.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
// TODO Auto-generated method stub
EmployeeKey p1= (EmployeeKey) a;
EmployeeKey p2= (EmployeeKey) b;
int cmp = p1.getStateID().compareTo(p2.getStateID());
return cmp;
}
}
Partitioner
public class Partition extends Partitioner<EmployeeKey, EmployeeValue>{
@Override
public int getPartition(EmployeeKey key, EmployeeValue arg1, int resd) {
// TODO Auto-generated method stub
return (key.getStateID().hashCode() & Integer.MAX_VALUE)% resd;
}
}
Reducer
public class Redude extends Reducer<EmployeeKey, EmployeeValue, Text, Text> {
@Override
protected void reduce(EmployeeKey key, Iterable<EmployeeValue> values,Context context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
Iterator<EmployeeValue> itr = values.iterator();
EmployeeValue val = itr.next();
String state = val.getState().toString();
if(state.equalsIgnoreCase("")){
state = "NO state found for this ID";
}
while(itr.hasNext()){
EmployeeValue val1 = itr.next();
context.write(new Text(val1.getFname()), new Text(state + key.getStateID()));
}
}
}
Comments
Post a Comment