package producerrandom; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; // inspired by the example code from // https://flink.apache.org/news/2015/02/09/streaming-example.html @SuppressWarnings("WeakerAccess") public class RandomProducer { public static void main(String[] args) throws Exception { Logger LOG = LoggerFactory.getLogger(RandomProducer.class); LOG.info("public static void main(String[] args)"); ParameterTool parameters = ParameterTool.fromArgs(args); int sourcesCount = parameters.getInt("sources", 3); boolean hasVerbose = parameters.has("verbose"); // --sources // optional argument; will initialize sources, pushed to // different topics; if it is not set, --sources 3 will be used by default // // --verbose // optional argument; if it is set, all messages from all sources will // be pushed to a topic called "verbose" in a human readable format // // Example usage: // no arguments is equivalent to: --sources 3 // --sources 1 is equivalent to: --sources 1 // --verbose is equivalent to: --sources 3 --verbose // --sources 5 --verbose is equivalent to: --sources 5 --verbose if (hasVerbose) { LOG.info("command line arguments: --sources {} --verbose", sourcesCount); } else { LOG.info("command line arguments: --sources {}", sourcesCount); } final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // add sourcesCount sources to the StreamExecutionEnvironment: for (int i = 0; i < sourcesCount; ++i) { RandomSource randomSource = new RandomSource(); LOG.info("new RandomSource()"); LOG.info("randomSource.symbol={}", randomSource.symbol); LOG.info("randomSource.value={}", randomSource.value); DataStream randomStream = env.addSource(randomSource).name(RandomSource.class.toString()); randomStream .map(new MapFunction() { @Override public String map(Price price) { return price.toSimpleString(); } }) .addSink(new FlinkKafkaProducer08<>("master:9092", randomSource.symbol, new SimpleStringSchema())) .name(randomSource.symbol); LOG.info(".addSink(new FlinkKafkaProducer08<>([...], \"{}\", [...]))", randomSource.symbol); // push all messages from all sources to a topic called "verbose" // if --verbose has been set as a command line argument (see above): if (hasVerbose) { randomStream .map(new MapFunction() { @Override public String map(Price price) { return price.toString(); } }) .addSink(new FlinkKafkaProducer08<>("master:9092", "verbose", new SimpleStringSchema())) .name("verbose"); LOG.info(".addSink(new FlinkKafkaProducer08<>([...], \"verbose\", [...]))"); } } env.execute(); } }