|
|
@ -112,6 +112,61 @@ class File extends EventEmitter { |
|
|
|
}); |
|
|
|
}); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// create a parse method which read the file and return a stream
|
|
|
|
async parse(columns) { |
|
|
|
const stream = new PassThrough(); |
|
|
|
|
|
|
|
// check if columns is valid
|
|
|
|
if (!columns || columns.length) { |
|
|
|
return Promise.reject(new Error('Invalid columns')); |
|
|
|
} |
|
|
|
|
|
|
|
// Create a variable to hold csv columns indexes
|
|
|
|
const indexes = []; |
|
|
|
|
|
|
|
let i = 0; |
|
|
|
fs.createReadStream(this.filepath) |
|
|
|
.pipe(csvParser({ separator: ';' })) |
|
|
|
.on('headers', (headers) => { |
|
|
|
// Emit a parse.start event with the url and filepath
|
|
|
|
this.emit('parse.start', { url: this.url, filepath: this.filepath, headers }); |
|
|
|
}) |
|
|
|
.on('data', (data) => { |
|
|
|
// Emit a parse.data event with the url, filepath and data
|
|
|
|
this.emit('parse.data', { url: this.url, filepath: this.filepath, data }); |
|
|
|
|
|
|
|
if ( i === 0 ) { |
|
|
|
// Loop through columns and push the index of the column to indexes
|
|
|
|
columns.forEach((column) => { |
|
|
|
indexes.push(headers.indexOf(column)); |
|
|
|
}); |
|
|
|
} else { |
|
|
|
// Loop through indexes and push the value of the column to result
|
|
|
|
const result = this.processRow(data, columns, indexes); |
|
|
|
stream.write(result.join(';')); |
|
|
|
} |
|
|
|
i++; |
|
|
|
}) |
|
|
|
.on('error', (err) => { |
|
|
|
// Emit a parse.error event with the error
|
|
|
|
this.emit('parse.error', { url: this.url, filepath: this.filepath, error: err.message }); |
|
|
|
}) |
|
|
|
.on('end', () => { |
|
|
|
// Emit a parse.end event with the url and filepath
|
|
|
|
this.emit('parse.end', { url: this.url, filepath: this.filepath }); |
|
|
|
stream.end(); |
|
|
|
}); |
|
|
|
|
|
|
|
return stream; |
|
|
|
} |
|
|
|
|
|
|
|
processRow(data, columns, columnsIndex) { |
|
|
|
const result = []; |
|
|
|
|
|
|
|
return result; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
module.exports = File; |