package producerrandom; import org.apache.flink.streaming.api.functions.source.SourceFunction; // extends Serializable import java.util.Random; @SuppressWarnings("WeakerAccess") public class RandomSource implements SourceFunction { private static final long serialVersionUID = 1L; private volatile boolean cancelled = false; private transient Random random; public final String symbol; public Double value; // the Price's value to start from public RandomSource() { this.random = new Random(); String alphanumeric = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"; // create a random symbol that looks like an International // Securities Identification Number (ISIN) in most cases: StringBuilder stringBuilder = new StringBuilder("XF0000"); for (int i = 0; i < 5; ++i) { int index = this.random.nextInt(36); // [0,36) char c = alphanumeric.charAt(index); stringBuilder.append(c); } stringBuilder.append("X"); // invalid check digit to distinguish randomly // generated "ISINs" from regular ISINs this.symbol = stringBuilder.toString(); this.value = 10.0 + this.random.nextInt(90); // [10,100) } /* public RandomSource(String symbol, Double value) { this.symbol = symbol; this.value = value; } */ public void run(SourceContext sourceContext) throws InterruptedException { this.random = new Random(); while (!this.cancelled) { Price price = new Price(this.symbol, System.currentTimeMillis(), this.value); sourceContext.collect(price); Thread.sleep(this.random.nextInt(1000)); // [0,1000) double delta = this.random.nextGaussian(); // N(0,1) delta = delta * 0.5; // reduce delta so that value moves more smoothly this.value = this.value + delta; } } public void cancel() { this.cancelled = true; } }